{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Neeps- Medium"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4040\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@6ce4ed35"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app15-2\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@308745e5"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mut_staff\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, name: string]\n",
       "\u001b[36mut_student\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, name: string ... 2 more fields]\n",
       "\u001b[36mut_event\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, modle: string ... 5 more fields]\n",
       "\u001b[36mut_room\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, name: string ... 2 more fields]\n",
       "\u001b[36mut_attends\u001b[39m: \u001b[32mDataFrame\u001b[39m = [student: string, event: string]\n",
       "\u001b[36mut_teaches\u001b[39m: \u001b[32mDataFrame\u001b[39m = [staff: string, event: string]\n",
       "\u001b[36mut_occurs\u001b[39m: \u001b[32mDataFrame\u001b[39m = [event: string, week: string]\n",
       "\u001b[36mut_modle\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, name: string]\n",
       "\u001b[36mut_week\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, wkstart: string]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val ut_staff = hiveCxt.table(\"sqlzoo.ut_staff\")\n",
    "val ut_student = hiveCxt.table(\"sqlzoo.ut_student\")\n",
    "val ut_event = hiveCxt.table(\"sqlzoo.ut_event\")\n",
    "val ut_room = hiveCxt.table(\"sqlzoo.ut_room\")\n",
    "val ut_attends = hiveCxt.table(\"sqlzoo.ut_attends\")\n",
    "val ut_teaches = hiveCxt.table(\"sqlzoo.ut_teaches\")\n",
    "val ut_occurs = hiveCxt.table(\"sqlzoo.ut_occurs\")\n",
    "val ut_modle = hiveCxt.table(\"sqlzoo.ut_modle\")\n",
    "val ut_week = hiveCxt.table(\"sqlzoo.ut_week\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6.\n",
    "**Show the 'size' of each of the co72010 events. Size is the total number of students attending each event.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>event</th><th>sum(sze)</th>\n",
       "                </tr>\n",
       "                <tr><td>co72013.L01</td><td>215</td></tr><tr><td>co72013.T02</td><td>49</td></tr><tr><td>co72013.T04</td><td>45</td></tr><tr><td>co72013.T01</td><td>40</td></tr><tr><td>co72013.T05</td><td>35</td></tr><tr><td>co72013.T06</td><td>29</td></tr><tr><td>co72013.T03</td><td>10</td></tr><tr><td>co72013.L02</td><td>10</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(ut_event.withColumnRenamed(\"id\", \"event\")\n",
    " .filter(ut_event(\"modle\")===\"co72010\")\n",
    " .join(ut_attends, \"event\")\n",
    " .join(ut_student, (ut_attends(\"student\")===ut_student(\"id\")))\n",
    " .groupBy(\"event\")\n",
    " .sum(\"sze\")\n",
    " .orderBy(col(\"sum(sze)\").desc)\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7.\n",
    "**For each post-graduate module, show the size of the teaching team. (post graduate modules start with the code co7).**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>id</th><th>count(staff)</th>\n",
       "                </tr>\n",
       "                <tr><td>co72003</td><td>2</td></tr><tr><td>co72011</td><td>2</td></tr><tr><td>co72018</td><td>1</td></tr><tr><td>co72017</td><td>1</td></tr><tr><td>co72026</td><td>2</td></tr><tr><td>co72016</td><td>2</td></tr><tr><td>co72033</td><td>1</td></tr><tr><td>co72010</td><td>2</td></tr><tr><td>co72006</td><td>1</td></tr><tr><td>co72021</td><td>1</td></tr><tr><td>co72004</td><td>1</td></tr><tr><td>co72005</td><td>2</td></tr><tr><td>co72012</td><td>1</td></tr><tr><td>co72002</td><td>1</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(ut_teaches\n",
    " .join(ut_event.withColumnRenamed(\"id\", \"event\"), \"event\")\n",
    " .join(ut_modle.filter(ut_modle(\"id\").like(\"co7%\")), \n",
    "       (ut_event(\"modle\")===ut_modle(\"id\")))\n",
    " .select(\"id\", \"staff\")\n",
    " .distinct()\n",
    " .groupBy(\"id\")\n",
    " .agg(count(\"staff\"))\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 8.\n",
    "**Give the full name of those modules which include events taught for fewer than 10 weeks.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>modle_name</th>\n",
       "                </tr>\n",
       "                <tr><td>Interactivity and the Internet</td></tr><tr><td>Internet Multimedia</td></tr><tr><td>Project</td></tr><tr><td>Languages and Algorithms</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(ut_modle.withColumnRenamed(\"name\", \"modle_name\")\n",
    " .join(ut_event.withColumnRenamed(\"id\", \"event\"), \n",
    "       (ut_modle(\"id\")===ut_event(\"modle\")))\n",
    " .join(ut_occurs, \"event\")\n",
    " .groupBy(\"event\", \"modle_name\")\n",
    " .count()\n",
    " .orderBy(\"modle_name\")\n",
    " .filter(col(\"count\")<10)\n",
    " .select(\"modle_name\")\n",
    " .distinct()\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 9.\n",
    "**Identify those events which start at the same time as one of the co72010 lectures.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>id</th>\n",
       "                </tr>\n",
       "                <tr><td>co12004.T04</td></tr><tr><td>co12004.T05</td></tr><tr><td>co12005.T01</td></tr><tr><td>co12005.T04</td></tr><tr><td>co12006.L03</td></tr><tr><td>co12008.L01</td></tr><tr><td>co12012.T01</td></tr><tr><td>co22005.T02</td></tr><tr><td>co22005.T04</td></tr><tr><td>co22005.T07</td></tr><tr><td>co22005.T08</td></tr><tr><td>co22006.L02</td></tr><tr><td>co22008.T03</td></tr><tr><td>co22008.T04</td></tr><tr><td>co32011.T03</td></tr><tr><td>co32014.T01</td></tr><tr><td>co32016.L01</td></tr><tr><td>co32018.L01</td></tr><tr><td>co32021.L01</td></tr><tr><td>co42001.L01</td></tr><tr><td>co42010.T01</td></tr><tr><td>co42015.T01</td></tr><tr><td>co72006.L01</td></tr><tr><td>co72016.T01</td></tr><tr><td>co72018.T01</td></tr><tr><td>coh2451.T01</td></tr><tr><td>coh8412555.L01</td></tr><tr><td>coh8412555.T02</td></tr><tr><td>coh8412585.T03</td></tr><tr><td>coh8412605.L01</td></tr><tr><td>coh8412605.T01</td></tr><tr><td>coh8412615.T03</td></tr><tr><td>coh8412615.T05</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36mt\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, modle: string ... 8 more fields]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val t = (ut_event\n",
    "     .join(ut_occurs, (ut_event(\"id\")===ut_occurs(\"event\")))\n",
    "     .withColumn(\"time\", concat_ws(\"\", col(\"week\"), col(\"dow\"), col(\"tod\"))))\n",
    "(t.filter(col(\"modle\") !== \"co72010\")\n",
    " .join(t.filter(col(\"modle\") === \"co72010\")\n",
    "       .select(\"time\"), \"time\")\n",
    " .select(\"id\")\n",
    " .distinct()\n",
    " .orderBy(\"id\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 10.\n",
    "**How many members of staff have contact time which is greater than the average?**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>count</th>\n",
       "                </tr>\n",
       "                <tr><td>19</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36mt\u001b[39m: \u001b[32mDataFrame\u001b[39m = [staff: string, duration: bigint]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val t = (ut_event.join(ut_teaches, (ut_event(\"id\")===ut_teaches(\"event\")))\n",
    "     .join(ut_occurs, (ut_event(\"id\")===ut_occurs(\"event\")))\n",
    "     .groupBy(\"staff\")\n",
    "     .agg(sum(\"duration\").alias(\"duration\")))\n",
    "\n",
    "(t.filter(col(\"duration\") > t.groupBy().sum(\"duration\").collect()(0)(0).asInstanceOf[Long] /\n",
    "          ut_teaches.select(\"staff\").distinct().count())\n",
    " .groupBy()\n",
    " .count()\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
